package cn.cup.dmp.report

import java.util.Properties

import cn.cup.dmp.config.ConfigHelper
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object User_Area_Distribute_Analysis_BySQL {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DMP平台-地域报表").setMaster("local[*]")
    conf.set("spark.serializer", ConfigHelper.ser)
    //设置序列化方式
    val sc = new SparkContext(conf)
    val sql = new SQLContext(sc)

    //读取parquet文件
    val dataFrame: DataFrame = sql.read.parquet(ConfigHelper.destPath)

    //注册临时表
    dataFrame.registerTempTable("row_logs")

    //sql查询  if（。。。）或者case when then 。。
    val da: DataFrame = sql.sql(
      """
select provincename province,cityname city,
sum(if (requestmode = 1 and processnode >= 1, 1,0)) original_request_number,
sum(if (requestmode = 1 and processnode >= 2, 1,0)) valid_requests,
sum(if (requestmode = 1 and processnode >= 3,1,0)) meet_condition_request,
sum(if (iseffective = 1 and isbilling = 1 and isbid = 1 and adorderid != 0, 1,0)) partitionbidsNum,
sum(if (iseffective = 1 and isbilling = 1 and iswin = 1, 1,0)) success_bid_Num,
sum(if (iseffective = 1 and isbilling = 1 and iswin = 1, winprice/1000,0)) everytime_consump,
sum(if (iseffective = 1 and isbilling = 1 and iswin = 1, adpayment/1000,0)) everytime_cost,
sum(if (requestmode = 2 and iseffective = 1, 1,0)) show_times,
sum(if (requestmode = 3 and iseffective = 1, 1,0)) click_times
from row_logs group by provincename,cityname
      """.stripMargin)


    //结果写入数据库
    val connetProperties = new Properties()
    connetProperties.setProperty("user", ConfigHelper.username)
    connetProperties.setProperty("password", ConfigHelper.password)
    da.write.jdbc(ConfigHelper.url, ConfigHelper.app_Analysis_table, connetProperties)

    sc.stop()

  }

}
